查看原文
其他

《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL

zhisheng zhisheng 2021-05-26


前言

之前其实在 《从0到1学习Flink》—— 如何自定义 Data Sink ? 文章中其实已经写了点将数据写入到 MySQL,但是一些配置化的东西当时是写死的,不能够通用,最近知识星球里有朋友叫我: 写个从 kafka 中读取数据,经过 Flink 做个预聚合,然后创建数据库连接池将数据批量写入到 mysql 的例子。

于是才有了这篇文章,更多提问和想要我写的文章可以在知识星球里像我提问,我会根据提问及时回答和尽可能作出文章的修改。

准备

你需要将这两个依赖添加到 pom.xml 中

1<dependency>
2    <groupId>mysql</groupId>
3    <artifactId>mysql-connector-java</artifactId>
4    <version>5.1.34</version>
5</dependency>

读取 kafka 数据

这里我依旧用的以前的 student 类,自己本地起了 kafka 然后造一些测试数据,这里我们测试发送一条数据则 sleep 10s,意味着往 kafka 中一分钟发 6 条数据。

1package com.zhisheng.connectors.mysql.utils;
2
3import com.zhisheng.common.utils.GsonUtil;
4import com.zhisheng.connectors.mysql.model.Student;
5import org.apache.kafka.clients.producer.KafkaProducer;
6import org.apache.kafka.clients.producer.ProducerRecord;
7
8import java.util.Properties;
9
10/**
11 * Desc: 往kafka中写数据,可以使用这个main函数进行测试
12 * Created by zhisheng on 2019-02-17
13 * Blog: http://www.54tianzhisheng.cn/tags/Flink/
14 */

15public class KafkaUtil {
16    public static final String broker_list = "localhost:9092";
17    public static final String topic = "student";  //kafka topic 需要和 flink 程序用同一个 topic
18
19    public static void writeToKafka() throws InterruptedException {
20        Properties props = new Properties();
21        props.put("bootstrap.servers", broker_list);
22        props.put("key.serializer""org.apache.kafka.common.serialization.StringSerializer");
23        props.put("value.serializer""org.apache.kafka.common.serialization.StringSerializer");
24        KafkaProducer producer = new KafkaProducer<String, String>(props);
25
26        for (int i = 1; i <= 100; i++) {
27            Student student = new Student(i, "zhisheng" + i, "password" + i, 18 + i);
28            ProducerRecord record = new ProducerRecord<String, String>(topic, nullnull, GsonUtil.toJson(student));
29            producer.send(record);
30            System.out.println("发送数据: " + GsonUtil.toJson(student));
31            Thread.sleep(10 * 1000); //发送一条数据 sleep 10s,相当于 1 分钟 6 条
32        }
33        producer.flush();
34    }
35
36    public static void main(String[] args) throws InterruptedException {
37        writeToKafka();
38    }
39}

从 kafka 中读取数据,然后序列化成 student 对象。

1final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2Properties props = new Properties();
3props.put("bootstrap.servers""localhost:9092");
4props.put("zookeeper.connect""localhost:2181");
5props.put("group.id""metric-group");
6props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
7props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
8props.put("auto.offset.reset""latest");
9
10SingleOutputStreamOperator<Student> student = env.addSource(new FlinkKafkaConsumer011<>(
11        "student",   //这个 kafka topic 需要和上面的工具类的 topic 一致
12        new SimpleStringSchema(),
13        props)).setParallelism(1)
14        .map(string -> GsonUtil.fromJson(string, Student.class)); //,解析字符串成 student 对象

因为 RichSinkFunction 中如果 sink 一条数据到 mysql 中就会调用 invoke 方法一次,所以如果要实现批量写的话,我们最好在 sink 之前就把数据聚合一下。那这里我们开个一分钟的窗口去聚合 Student 数据。

1student.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction<Student, List<Student>, TimeWindow>() {
2    @Override
3    public void apply(TimeWindow window, Iterable<Student> values, Collector<List<Student>> out) throws Exception {
4        ArrayList<Student> students = Lists.newArrayList(values);
5        if (students.size() > 0) {
6            System.out.println("1 分钟内收集到 student 的数据条数是:" + students.size());
7            out.collect(students);
8        }
9    }
10});

写入数据库

这里使用 DBCP 连接池连接数据库 mysql,pom.xml 中添加依赖:

1<dependency>
2    <groupId>org.apache.commons</groupId>
3    <artifactId>commons-dbcp2</artifactId>
4    <version>2.1.1</version>
5</dependency>

如果你想使用其他的数据库连接池请加入对应的依赖。

这里将数据写入到 MySQL 中,依旧是和之前文章一样继承 RichSinkFunction 类,重写里面的方法:

1package com.zhisheng.connectors.mysql.sinks;
2
3import com.zhisheng.connectors.mysql.model.Student;
4import org.apache.commons.dbcp2.BasicDataSource;
5import org.apache.flink.configuration.Configuration;
6import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
7
8import javax.sql.DataSource;
9import java.sql.Connection;
10import java.sql.DriverManager;
11import java.sql.PreparedStatement;
12import java.util.List;
13
14/**
15 * Desc: 数据批量 sink 数据到 mysql
16 * Created by zhisheng_tian on 2019-02-17
17 * Blog: http://www.54tianzhisheng.cn/tags/Flink/
18 */

19public class SinkToMySQL extends RichSinkFunction<List<Student>> {
20    PreparedStatement ps;
21    BasicDataSource dataSource;
22    private Connection connection;
23
24    /**
25     * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
26     *
27     * @param parameters
28     * @throws Exception
29     */

30    @Override
31    public void open(Configuration parameters) throws Exception {
32        super.open(parameters);
33        dataSource = new BasicDataSource();
34        connection = getConnection(dataSource);
35        String sql = "insert into Student(id, name, password, age) values(?, ?, ?, ?);";
36        ps = this.connection.prepareStatement(sql);
37    }
38
39    @Override
40    public void close() throws Exception {
41        super.close();
42        //关闭连接和释放资源
43        if (connection != null) {
44            connection.close();
45        }
46        if (ps != null) {
47            ps.close();
48        }
49    }
50
51    /**
52     * 每条数据的插入都要调用一次 invoke() 方法
53     *
54     * @param value
55     * @param context
56     * @throws Exception
57     */

58    @Override
59    public void invoke(List<Student> value, Context context) throws Exception {
60        //遍历数据集合
61        for (Student student : value) {
62            ps.setInt(1, student.getId());
63            ps.setString(2, student.getName());
64            ps.setString(3, student.getPassword());
65            ps.setInt(4, student.getAge());
66            ps.addBatch();
67        }
68        int[] count = ps.executeBatch();//批量后执行
69        System.out.println("成功了插入了" + count.length + "行数据");
70    }
71
72
73    private static Connection getConnection(BasicDataSource dataSource) {
74        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
75        //注意,替换成自己本地的 mysql 数据库地址和用户名、密码
76        dataSource.setUrl("jdbc:mysql://localhost:3306/test");
77        dataSource.setUsername("root");
78        dataSource.setPassword("root123456");
79        //设置连接池的一些参数
80        dataSource.setInitialSize(10);
81        dataSource.setMaxTotal(50);
82        dataSource.setMinIdle(2);
83
84        Connection con = null;
85        try {
86            con = dataSource.getConnection();
87            System.out.println("创建连接池:" + con);
88        } catch (Exception e) {
89            System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
90        }
91        return con;
92    }
93}

核心类 Main

核心程序如下:

1public class Main {
2    public static void main(String[] args) throws Exception{
3        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
4        Properties props = new Properties();
5        props.put("bootstrap.servers""localhost:9092");
6        props.put("zookeeper.connect""localhost:2181");
7        props.put("group.id""metric-group");
8        props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
9        props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
10        props.put("auto.offset.reset""latest");
11
12        SingleOutputStreamOperator<Student> student = env.addSource(new FlinkKafkaConsumer011<>(
13                "student",   //这个 kafka topic 需要和上面的工具类的 topic 一致
14                new SimpleStringSchema(),
15                props)).setParallelism(1)
16                .map(string -> GsonUtil.fromJson(string, Student.class)); //
17        student.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction<Student, List<Student>, TimeWindow>() {
18            @Override
19            public void apply(TimeWindow window, Iterable<Student> values, Collector<List<Student>> out) throws Exception {
20                ArrayList<Student> students = Lists.newArrayList(values);
21                if (students.size() > 0) {
22                    System.out.println("1 分钟内收集到 student 的数据条数是:" + students.size());
23                    out.collect(students);
24                }
25            }
26        }).addSink(new SinkToMySQL());
27
28        env.execute("flink learning connectors kafka");
29    }
30}

运行项目

运行 Main 类后再运行 KafkaUtils.java 类!

下图是往 Kafka 中发送的数据:

下图是运行 Main 类的日志,会创建 4 个连接池是因为默认的 4 个并行度,你如果在 addSink 这个算子设置并行度为 1 的话就会创建一个连接池:

下图是批量插入数据库的结果:

总结

本文从知识星球一位朋友的疑问来写的,应该都满足了他的条件(批量/数据库连接池/写入mysql),的确网上很多的例子都是简单的 demo 形式,都是单条数据就创建数据库连接插入 MySQL,如果要写的数据量很大的话,会对 MySQL 的写有很大的压力。这也是我之前在 《从0到1学习Flink》—— Flink 写入数据到 ElasticSearch 中,数据写 ES 强调过的,如果要提高性能必定要批量的写。就拿我们现在这篇文章来说,如果数据量大的话,聚合一分钟数据达万条,那么这样批量写会比来一条写一条性能提高不知道有多少。

本文原创地址是: http://www.54tianzhisheng.cn/2019/01/15/Flink-MySQL-sink/ , 未经允许禁止转载。

关注我

微信公众号:zhisheng

另外我自己整理了些 Flink 的学习资料,目前已经全部放到微信公众号了。你可以加我的微信:zhisheng_tian,然后回复关键字:Flink 即可无条件获取到。

更多私密资料请加入知识星球!

Github 代码仓库

https://github.com/zhisheng17/flink-learning/

以后这个项目的所有代码都将放在这个仓库里,包含了自己学习 flink 的一些 demo 和博客,欢迎点 star!

本文的项目代码在 https://github.com/zhisheng17/flink-learning/tree/master/flink-learning-connectors/flink-learning-connectors-mysql

相关文章

1、《从0到1学习Flink》—— Apache Flink 介绍

2、《从0到1学习Flink》—— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门

3、《从0到1学习Flink》—— Flink 配置文件详解

4、《从0到1学习Flink》—— Data Source 介绍

5、《从0到1学习Flink》—— 如何自定义 Data Source ?

6、《从0到1学习Flink》—— Data Sink 介绍

7、《从0到1学习Flink》—— 如何自定义 Data Sink ?

8、《从0到1学习Flink》—— Flink Data transformation(转换)

9、《从0到1学习Flink》—— 介绍Flink中的Stream Windows

10、《从0到1学习Flink》—— Flink 中的几种 Time 详解

11、《从0到1学习Flink》—— Flink 写入数据到 ElasticSearch

12、《从0到1学习Flink》—— Flink 项目如何运行?

13、《从0到1学习Flink》—— Flink 写入数据到 Kafka

14、《从0到1学习Flink》—— Flink JobManager 高可用性配置

15、《从0到1学习Flink》—— Flink parallelism 和 Slot 介绍

    您可能也对以下帖子感兴趣

    Spring Boot 3.3 嵌套事务 REQUIRES_NEW 与 NESTED 实现
    Spring Boot3.3 项目数据库连接使用SM4国密加密算法进行脱敏配置
    在 SpringBoot3.3 中拦截修改请求 Body 的多种正确方式
    SpringBoot+mail 轻松实现各类邮件自动推送
    实操:关于Flink中文件读取,从实验中看网上的那些博客对吗?

    文章有问题?点此查看未经处理的缓存